{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import os\n",
    "from pyspark.sql import SparkSession\n",
    "from synapse.ml.core.platform import running_on_synapse, find_secret\n",
    "\n",
    "# Bootstrap Spark Session\n",
    "spark = SparkSession.builder.getOrCreate()\n",
    "if running_on_synapse():\n",
    "    from notebookutils.visualization import display\n",
    "\n",
    "cognitive_key = find_secret(\"cognitive-api-key\")\n",
    "cognitive_location = \"eastus\"\n",
    "\n",
    "translator_key = find_secret(\"translator-key\")\n",
    "translator_location = \"eastus\"\n",
    "\n",
    "search_key = find_secret(\"azure-search-key\")\n",
    "search_service = \"mmlspark-azure-search\"\n",
    "search_index = \"form-demo-index\""
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark.sql.functions import udf\n",
    "from pyspark.sql.types import StringType\n",
    "\n",
    "\n",
    "def blob_to_url(blob):\n",
    "    [prefix, postfix] = blob.split(\"@\")\n",
    "    container = prefix.split(\"/\")[-1]\n",
    "    split_postfix = postfix.split(\"/\")\n",
    "    account = split_postfix[0]\n",
    "    filepath = \"/\".join(split_postfix[1:])\n",
    "    return \"https://{}/{}/{}\".format(account, container, filepath)\n",
    "\n",
    "\n",
    "df2 = (\n",
    "    spark.read.format(\"binaryFile\")\n",
    "    .load(\"wasbs://ignite2021@mmlsparkdemo.blob.core.windows.net/form_subset/*\")\n",
    "    .select(\"path\")\n",
    "    .limit(10)\n",
    "    .select(udf(blob_to_url, StringType())(\"path\").alias(\"url\"))\n",
    "    .cache()\n",
    ")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "display(df2)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "pycharm": {
     "name": "#%% md\n"
    }
   },
   "source": [
    "<embed src=\"https://mmlsparkdemo.blob.core.windows.net/ignite2021/form_svgs/Invoice11205.svg\" width=\"40%\"/>"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from synapse.ml.cognitive import AnalyzeInvoices\n",
    "\n",
    "analyzed_df = (\n",
    "    AnalyzeInvoices()\n",
    "    .setSubscriptionKey(cognitive_key)\n",
    "    .setLocation(cognitive_location)\n",
    "    .setImageUrlCol(\"url\")\n",
    "    .setOutputCol(\"invoices\")\n",
    "    .setErrorCol(\"errors\")\n",
    "    .setConcurrency(5)\n",
    "    .transform(df2)\n",
    "    .cache()\n",
    ")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "display(analyzed_df)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from synapse.ml.cognitive import FormOntologyLearner\n",
    "\n",
    "organized_df = (\n",
    "    FormOntologyLearner()\n",
    "    .setInputCol(\"invoices\")\n",
    "    .setOutputCol(\"extracted\")\n",
    "    .fit(analyzed_df)\n",
    "    .transform(analyzed_df)\n",
    "    .select(\"url\", \"extracted.*\")\n",
    "    .cache()\n",
    ")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "display(organized_df)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark.sql.functions import explode, col\n",
    "\n",
    "itemized_df = (\n",
    "    organized_df.select(\"*\", explode(col(\"Items\")).alias(\"Item\"))\n",
    "    .drop(\"Items\")\n",
    "    .select(\"Item.*\", \"*\")\n",
    "    .drop(\"Item\")\n",
    ")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "display(itemized_df)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "display(itemized_df.where(col(\"ProductCode\") == 48))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from synapse.ml.cognitive import Translate\n",
    "\n",
    "translated_df = (\n",
    "    Translate()\n",
    "    .setSubscriptionKey(translator_key)\n",
    "    .setLocation(translator_location)\n",
    "    .setTextCol(\"Description\")\n",
    "    .setErrorCol(\"TranslationError\")\n",
    "    .setOutputCol(\"output\")\n",
    "    .setToLanguage([\"zh-Hans\", \"fr\", \"ru\", \"cy\"])\n",
    "    .setConcurrency(5)\n",
    "    .transform(itemized_df)\n",
    "    .withColumn(\"Translations\", col(\"output.translations\")[0])\n",
    "    .drop(\"output\", \"TranslationError\")\n",
    "    .cache()\n",
    ")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "display(translated_df)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from synapse.ml.cognitive import *\n",
    "from pyspark.sql.functions import monotonically_increasing_id, lit\n",
    "\n",
    "(\n",
    "    translated_df.withColumn(\"DocID\", monotonically_increasing_id().cast(\"string\"))\n",
    "    .withColumn(\"SearchAction\", lit(\"upload\"))\n",
    "    .writeToAzureSearch(\n",
    "        subscriptionKey=search_key,\n",
    "        actionCol=\"SearchAction\",\n",
    "        serviceName=search_service,\n",
    "        indexName=search_index,\n",
    "        keyCol=\"DocID\",\n",
    "    )\n",
    ")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import requests\n",
    "\n",
    "url = \"https://{}.search.windows.net/indexes/{}/docs/search?api-version=2019-05-06\".format(\n",
    "    search_service, search_index\n",
    ")\n",
    "requests.post(url, json={\"search\": \"door\"}, headers={\"api-key\": search_key}).json()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "description": null,
  "kernelspec": {
   "display_name": "Synapse PySpark",
   "name": "synapse_pyspark"
  },
  "language_info": {
   "name": "python"
  },
  "save_output": true
 },
 "nbformat": 4,
 "nbformat_minor": 0
}
